{
  "nbformat": 4,
  "nbformat_minor": 0,
  "metadata": {
    "colab": {
      "name": "Reading and writing data -- Tour of Beam",
      "provenance": [],
      "collapsed_sections": [],
      "toc_visible": true
    },
    "kernelspec": {
      "name": "python3",
      "display_name": "Python 3"
    }
  },
  "cells": [
    {
      "cell_type": "code",
      "metadata": {
        "cellView": "form",
        "id": "upmJn_DjcThx"
      },
      "source": [
        "#@title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n",
        "\n",
        "# Licensed to the Apache Software Foundation (ASF) under one\n",
        "# or more contributor license agreements. See the NOTICE file\n",
        "# distributed with this work for additional information\n",
        "# regarding copyright ownership. The ASF licenses this file\n",
        "# to you under the Apache License, Version 2.0 (the\n",
        "# \"License\"); you may not use this file except in compliance\n",
        "# with the License. You may obtain a copy of the License at\n",
        "#\n",
        "#   http://www.apache.org/licenses/LICENSE-2.0\n",
        "#\n",
        "# Unless required by applicable law or agreed to in writing,\n",
        "# software distributed under the License is distributed on an\n",
        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
        "# KIND, either express or implied. See the License for the\n",
        "# specific language governing permissions and limitations\n",
        "# under the License."
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "5UC_aGanx6oE"
      },
      "source": [
        "# Reading and writing data -- _Tour of Beam_\n",
        "\n",
        "So far we've learned some of the basic transforms like\n",
        "[`Map`](https://beam.apache.org/documentation/transforms/python/elementwise/map),\n",
        "[`FlatMap`](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap),\n",
        "[`Filter`](https://beam.apache.org/documentation/transforms/python/elementwise/filter),\n",
        "[`Combine`](https://beam.apache.org/documentation/transforms/python/aggregation/combineglobally), and\n",
        "[`GroupByKey`](https://beam.apache.org/documentation/transforms/python/aggregation/groupbykey).\n",
        "These allow us to transform data in any way, but so far we've used\n",
        "[`Create`](https://beam.apache.org/documentation/transforms/python/other/create)\n",
        "to get data from an in-memory\n",
        "[`iterable`](https://docs.python.org/3/glossary.html#term-iterable), like a `list`.\n",
        "\n",
        "This works well for experimenting with small datasets. For larger datasets we can use `Source` transforms to read data and `Sink` transforms to write data.\n",
        "If there are no built-in `Source` or `Sink` transforms, we can also easily create our custom I/O transforms.\n",
        "\n",
        "Let's create some data files and see how we can read them in Beam."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "R_Yhoc6N_Flg"
      },
      "source": [
        "# Install apache-beam with pip.\n",
        "!pip install --quiet apache-beam\n",
        "\n",
        "# Create a directory for our data files.\n",
        "!mkdir -p data"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "sQUUi4H9s-g2",
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "outputId": "da02ce82-61ce-43e2-a5f0-21b06df76bd6"
      },
      "source": [
        "%%writefile data/my-text-file-1.txt\n",
        "This is just a plain text file, UTF-8 strings are allowed 🎉.\n",
        "Each line in the file is one element in the PCollection."
      ],
      "execution_count": null,
      "outputs": [
        {
          "output_type": "stream",
          "text": [
            "Writing data/my-text-file-1.txt\n"
          ],
          "name": "stdout"
        }
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "BWVVeTSOlKug",
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "outputId": "2f0ad045-0af3-4ca7-f5a3-1f348b5c1517"
      },
      "source": [
        "%%writefile data/my-text-file-2.txt\n",
        "There are no guarantees on the order of the elements.\n",
        "ฅ^•ﻌ•^ฅ"
      ],
      "execution_count": null,
      "outputs": [
        {
          "output_type": "stream",
          "text": [
            "Writing data/my-text-file-2.txt\n"
          ],
          "name": "stdout"
        }
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "NhCws6ncbDJG",
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "outputId": "0b5e0dc6-43ee-4786-c1d5-984fd422e445"
      },
      "source": [
        "%%writefile data/penguins.csv\n",
        "species,culmen_length_mm,culmen_depth_mm,flipper_length_mm,body_mass_g\n",
        "0,0.2545454545454545,0.6666666666666666,0.15254237288135594,0.2916666666666667\n",
        "0,0.26909090909090905,0.5119047619047618,0.23728813559322035,0.3055555555555556\n",
        "1,0.5236363636363636,0.5714285714285713,0.3389830508474576,0.2222222222222222\n",
        "1,0.6509090909090909,0.7619047619047619,0.4067796610169492,0.3333333333333333\n",
        "2,0.509090909090909,0.011904761904761862,0.6610169491525424,0.5\n",
        "2,0.6509090909090909,0.38095238095238104,0.9830508474576272,0.8333333333333334"
      ],
      "execution_count": null,
      "outputs": [
        {
          "output_type": "stream",
          "text": [
            "Writing data/penguins.csv\n"
          ],
          "name": "stdout"
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "_OkWHiAvpWDZ"
      },
      "source": [
        "# Reading from text files\n",
        "\n",
        "We can use the\n",
        "[`ReadFromText`](https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.ReadFromText)\n",
        "transform to read text files into `str` elements.\n",
        "\n",
        "It takes a\n",
        "[_glob pattern_](https://en.wikipedia.org/wiki/Glob_%28programming%29)\n",
        "as an input, and reads all the files that match that pattern.\n",
        "It returns one element for each line in the file.\n",
        "\n",
        "For example, in the pattern `data/*.txt`, the `*` is a wildcard that matches anything. This pattern matches all the files in the `data/` directory with a `.txt` extension."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "xDXdE9uysriw",
        "outputId": "3ef9f7a9-8291-42a1-be03-71e50de266f5"
      },
      "source": [
        "import apache_beam as beam\n",
        "\n",
        "input_files = 'data/*.txt'\n",
        "with beam.Pipeline() as pipeline:\n",
        "  (\n",
        "      pipeline\n",
        "      | 'Read files' >> beam.io.ReadFromText(input_files)\n",
        "      | 'Print contents' >> beam.Map(print)\n",
        "  )"
      ],
      "execution_count": null,
      "outputs": [
        {
          "output_type": "stream",
          "text": [
            "This is just a plain text file, UTF-8 strings are allowed 🎉.\n",
            "Each line in the file is one element in the PCollection.\n",
            "There are no guarantees on the order of the elements.\n",
            "ฅ^•ﻌ•^ฅ\n"
          ],
          "name": "stdout"
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "9-2wmzEWsdrb"
      },
      "source": [
        "# Writing to text files\n",
        "\n",
        "We can use the\n",
        "[`WriteToText`](https://beam.apache.org/releases/pydoc/2.27.0/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText) transform to write `str` elements into text files.\n",
        "\n",
        "It takes a _file path prefix_ as an input, and it writes the all `str` elements into one or more files with filenames starting with that prefix. You can optionally pass a `file_name_suffix` as well, usually used for the file extension. Each element goes into its own line in the output files."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "nkPlfoTfz61I"
      },
      "source": [
        "import apache_beam as beam\n",
        "\n",
        "output_file_name_prefix = 'outputs/file'\n",
        "with beam.Pipeline() as pipeline:\n",
        "  (\n",
        "      pipeline\n",
        "      | 'Create file lines' >> beam.Create([\n",
        "          'Each element must be a string.',\n",
        "          'It writes one element per line.',\n",
        "          'There are no guarantees on the line order.',\n",
        "          'The data might be written into multiple files.',\n",
        "      ])\n",
        "      | 'Write to files' >> beam.io.WriteToText(\n",
        "          output_file_name_prefix,\n",
        "          file_name_suffix='.txt')\n",
        "  )"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "code",
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "8au0yJSd1itt",
        "outputId": "4822458b-2724-42e9-c71f-280a82d505d6"
      },
      "source": [
        "# Lets look at the output files and contents.\n",
        "!head outputs/file*.txt"
      ],
      "execution_count": null,
      "outputs": [
        {
          "output_type": "stream",
          "text": [
            "Each element must be a string.\n",
            "It writes one element per line.\n",
            "There are no guarantees on the line order.\n",
            "The data might be written into multiple files.\n"
          ],
          "name": "stdout"
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "21CCdZispqYK"
      },
      "source": [
        "# Reading data\n",
        "\n",
        "Your data might reside in various input formats. Take a look at the\n",
        "[Built-in I/O Transforms](https://beam.apache.org/documentation/io/built-in)\n",
        "page for a list of all the available I/O transforms in Beam.\n",
        "\n",
        "If none of those work for you, you might need to create your own input transform.\n",
        "\n",
        "> ℹ️ For a more in-depth guide, take a look at the\n",
        "[Developing a new I/O connector](https://beam.apache.org/documentation/io/developing-io-overview) page."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "7dQEym1QRG4y"
      },
      "source": [
        "## Reading from an `iterable`\n",
        "\n",
        "The easiest way to create elements is using\n",
        "[`FlatMap`](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap).\n",
        "\n",
        "A common way is having a [`generator`](https://docs.python.org/3/glossary.html#term-generator) function. This could take an input and _expand_ it into a large amount of elements. The nice thing about `generator`s is that they don't have to fit everything into memory like a `list`, they simply\n",
        "[`yield`](https://docs.python.org/3/reference/simple_stmts.html#yield)\n",
        "elements as they process them.\n",
        "\n",
        "For example, let's define a `generator` called `count`, that `yield`s the numbers from `0` to `n`. We use `Create` for the initial `n` value(s) and then exapand them with `FlatMap`."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "wR6WY6wOMVhb",
        "outputId": "f90ae0a1-0cb4-4f25-fa93-10f9de856a95"
      },
      "source": [
        "import apache_beam as beam\n",
        "from typing import Iterable\n",
        "\n",
        "def count(n: int) -> Iterable[int]:\n",
        "  for i in range(n):\n",
        "    yield i\n",
        "\n",
        "n = 5\n",
        "with beam.Pipeline() as pipeline:\n",
        "  (\n",
        "      pipeline\n",
        "      | 'Create inputs' >> beam.Create([n])\n",
        "      | 'Generate elements' >> beam.FlatMap(count)\n",
        "      | 'Print elements' >> beam.Map(print)\n",
        "  )"
      ],
      "execution_count": null,
      "outputs": [
        {
          "output_type": "stream",
          "text": [
            "0\n",
            "1\n",
            "2\n",
            "3\n",
            "4\n"
          ],
          "name": "stdout"
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "G4fw7NE1RQNf"
      },
      "source": [
        "## Creating an input transform\n",
        "\n",
        "For a nicer interface, we could abstract the `Create` and the `FlatMap` into a custom `PTransform`. This would give a more intuitive way to use it, while hiding the inner workings.\n",
        "\n",
        "We need a new class that inherits from `beam.PTransform`. We can do this more conveniently with the\n",
        "[`beam.ptransform_fn`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.ptransform.html#apache_beam.transforms.ptransform.ptransform_fn) decorator.\n",
        "\n",
        "The `PTransform` function takes the input `PCollection` as the first argument, and any other inputs from the generator function, like `n`, can be arguments to the `PTransform` as well. The original generator function can be defined locally within the `PTransform`.\n",
        "Finally, we apply the `Create` and `FlatMap` transforms and return a new `PCollection`.\n",
        "\n",
        "We can also, optionally, add type hints with the [`with_input_types`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.ptransform.html#apache_beam.transforms.ptransform.PTransform.with_input_types) and [`with_output_types`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.ptransform.html#apache_beam.transforms.ptransform.PTransform.with_output_types) decorators. They serve both as documentation, and are a way to ensure your data types are consistent throughout your pipeline. This becomes more useful as the complexity grows.\n",
        "\n",
        "Since our `PTransform` is expected to be the first transform in the pipeline, it doesn't receive any inputs. We can mark it as the beginning with the [`PBegin`](https://beam.apache.org/releases/pydoc/current/_modules/apache_beam/pvalue.html) type hint.\n",
        "\n",
        "Finally, to enable type checking, you can pass `--type_check_additional=all` when running your pipeline. Alternatively, you can also pass it directly to `PipelineOptions` if you want them enabled by default. To learn more about pipeline options, see [Configuring pipeline options](https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options)."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "m8iXqE1CRnn5",
        "outputId": "d77fd363-76eb-49ce-8729-82d6cd38cfda"
      },
      "source": [
        "import apache_beam as beam\n",
        "from apache_beam.options.pipeline_options import PipelineOptions\n",
        "from typing import Iterable\n",
        "\n",
        "@beam.ptransform_fn\n",
        "@beam.typehints.with_input_types(beam.pvalue.PBegin)\n",
        "@beam.typehints.with_output_types(int)\n",
        "def Count(pbegin: beam.pvalue.PBegin, n: int) -> beam.PCollection[int]:\n",
        "  def count(n: int) -> Iterable[int]:\n",
        "    for i in range(n):\n",
        "      yield i\n",
        "\n",
        "  return (\n",
        "      pbegin\n",
        "      | 'Create inputs' >> beam.Create([n])\n",
        "      | 'Generate elements' >> beam.FlatMap(count)\n",
        "  )\n",
        "\n",
        "n = 5\n",
        "options = PipelineOptions(flags=[], type_check_additional='all')\n",
        "with beam.Pipeline(options=options) as pipeline:\n",
        "  (\n",
        "      pipeline\n",
        "      | f'Count to {n}' >> Count(n)\n",
        "      | 'Print elements' >> beam.Map(print)\n",
        "  )"
      ],
      "execution_count": null,
      "outputs": [
        {
          "output_type": "stream",
          "text": [
            "0\n",
            "1\n",
            "2\n",
            "3\n",
            "4\n"
          ],
          "name": "stdout"
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "e02_vFmUg-mK"
      },
      "source": [
        "## Example: Reading CSV files\n",
        "\n",
        "Lets say we want to read CSV files to get elements as Python dictionaries. We like how `ReadFromText` expands a file pattern, but we might want to allow for multiple patterns as well.\n",
        "\n",
        "We create a `ReadCsvFiles` transform, which takes a list of `file_patterns` as input. It expands all the `glob` patterns, and then, for each file name it reads each row as a `dict` using the\n",
        "[`csv.DictReader`](https://docs.python.org/3/library/csv.html#csv.DictReader) module.\n",
        "\n",
        "We could use the [`open`](https://docs.python.org/3/library/functions.html#open) function to open a local file, but Beam already supports several different file systems besides local files.\n",
        "To leverage that, we can use the [`apache_beam.io.filesystems`](https://beam.apache.org/releases/pydoc/current/apache_beam.io.filesystems.html) module.\n",
        "\n",
        "> ℹ️ The [`open`](https://beam.apache.org/releases/pydoc/current/apache_beam.io.filesystems.html#apache_beam.io.filesystems.FileSystems.open)\n",
        "> function from the Beam filesystem reads bytes,\n",
        "> it's roughly equivalent to opening a file in `rb` mode.\n",
        "> To write a file, you would use\n",
        "> [`create`](https://beam.apache.org/releases/pydoc/current/apache_beam.io.filesystems.html#apache_beam.io.filesystems.FileSystems.open) instead."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "ywVbJxegaZbo",
        "outputId": "8dd0fdf3-43e8-47db-8442-ed9e88ef6c95"
      },
      "source": [
        "import apache_beam as beam\n",
        "from apache_beam.io.filesystems import FileSystems as beam_fs\n",
        "from apache_beam.options.pipeline_options import PipelineOptions\n",
        "import codecs\n",
        "import csv\n",
        "from typing import Dict, Iterable, List\n",
        "\n",
        "@beam.ptransform_fn\n",
        "@beam.typehints.with_input_types(beam.pvalue.PBegin)\n",
        "@beam.typehints.with_output_types(Dict[str, str])\n",
        "def ReadCsvFiles(pbegin: beam.pvalue.PBegin, file_patterns: List[str]) -> beam.PCollection[Dict[str, str]]:\n",
        "  def expand_pattern(pattern: str) -> Iterable[str]:\n",
        "    for match_result in beam_fs.match([pattern])[0].metadata_list:\n",
        "      yield match_result.path\n",
        "\n",
        "  def read_csv_lines(file_name: str) -> Iterable[Dict[str, str]]:\n",
        "    with beam_fs.open(file_name) as f:\n",
        "      # Beam reads files as bytes, but csv expects strings,\n",
        "      # so we need to decode the bytes into utf-8 strings.\n",
        "      for row in csv.DictReader(codecs.iterdecode(f, 'utf-8')):\n",
        "        yield dict(row)\n",
        "\n",
        "  return (\n",
        "      pbegin\n",
        "      | 'Create file patterns' >> beam.Create(file_patterns)\n",
        "      | 'Expand file patterns' >> beam.FlatMap(expand_pattern)\n",
        "      | 'Read CSV lines' >> beam.FlatMap(read_csv_lines)\n",
        "  )\n",
        "\n",
        "input_patterns = ['data/*.csv']\n",
        "options = PipelineOptions(flags=[], type_check_additional='all')\n",
        "with beam.Pipeline(options=options) as pipeline:\n",
        "  (\n",
        "      pipeline\n",
        "      | 'Read CSV files' >> ReadCsvFiles(input_patterns)\n",
        "      | 'Print elements' >> beam.Map(print)\n",
        "  )"
      ],
      "execution_count": null,
      "outputs": [
        {
          "output_type": "stream",
          "text": [
            "{'species': '0', 'culmen_length_mm': '0.2545454545454545', 'culmen_depth_mm': '0.6666666666666666', 'flipper_length_mm': '0.15254237288135594', 'body_mass_g': '0.2916666666666667'}\n",
            "{'species': '0', 'culmen_length_mm': '0.26909090909090905', 'culmen_depth_mm': '0.5119047619047618', 'flipper_length_mm': '0.23728813559322035', 'body_mass_g': '0.3055555555555556'}\n",
            "{'species': '1', 'culmen_length_mm': '0.5236363636363636', 'culmen_depth_mm': '0.5714285714285713', 'flipper_length_mm': '0.3389830508474576', 'body_mass_g': '0.2222222222222222'}\n",
            "{'species': '1', 'culmen_length_mm': '0.6509090909090909', 'culmen_depth_mm': '0.7619047619047619', 'flipper_length_mm': '0.4067796610169492', 'body_mass_g': '0.3333333333333333'}\n",
            "{'species': '2', 'culmen_length_mm': '0.509090909090909', 'culmen_depth_mm': '0.011904761904761862', 'flipper_length_mm': '0.6610169491525424', 'body_mass_g': '0.5'}\n",
            "{'species': '2', 'culmen_length_mm': '0.6509090909090909', 'culmen_depth_mm': '0.38095238095238104', 'flipper_length_mm': '0.9830508474576272', 'body_mass_g': '0.8333333333333334'}\n"
          ],
          "name": "stdout"
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "ZyzB_RO9Vs1D"
      },
      "source": [
        "## Example: Reading from a SQLite database\n",
        "\n",
        "Lets begin by creating a small SQLite local database file.\n",
        "\n",
        "Run the _\"Creating the SQLite database\"_ cell to create a new SQLite3 database with the filename you choose. You can double-click it to see the source code if you want."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "EJ58A0AoV02o",
        "cellView": "form",
        "outputId": "f932e834-8d65-4ddc-a4f8-1fc825c30b41"
      },
      "source": [
        "#@title Creating the SQLite database\n",
        "import sqlite3\n",
        "\n",
        "database_file = \"moon-phases.db\" #@param {type:\"string\"}\n",
        "\n",
        "with sqlite3.connect(database_file) as db:\n",
        "  cursor = db.cursor()\n",
        "\n",
        "  # Create the moon_phases table.\n",
        "  cursor.execute('''\n",
        "    CREATE TABLE IF NOT EXISTS moon_phases (\n",
        "      id INTEGER PRIMARY KEY,\n",
        "      phase_emoji TEXT NOT NULL,\n",
        "      peak_datetime DATETIME NOT NULL,\n",
        "      phase TEXT NOT NULL)''')\n",
        "\n",
        "  # Truncate the table if it's already populated.\n",
        "  cursor.execute('DELETE FROM moon_phases')\n",
        "\n",
        "  # Insert some sample data.\n",
        "  insert_moon_phase = 'INSERT INTO moon_phases(phase_emoji, peak_datetime, phase) VALUES(?, ?, ?)'\n",
        "  cursor.execute(insert_moon_phase, ('🌕', '2017-12-03 15:47:00', 'Full Moon'))\n",
        "  cursor.execute(insert_moon_phase, ('🌗', '2017-12-10 07:51:00', 'Last Quarter'))\n",
        "  cursor.execute(insert_moon_phase, ('🌑', '2017-12-18 06:30:00', 'New Moon'))\n",
        "  cursor.execute(insert_moon_phase, ('🌓', '2017-12-26 09:20:00', 'First Quarter'))\n",
        "  cursor.execute(insert_moon_phase, ('🌕', '2018-01-02 02:24:00', 'Full Moon'))\n",
        "  cursor.execute(insert_moon_phase, ('🌗', '2018-01-08 22:25:00', 'Last Quarter'))\n",
        "  cursor.execute(insert_moon_phase, ('🌑', '2018-01-17 02:17:00', 'New Moon'))\n",
        "  cursor.execute(insert_moon_phase, ('🌓', '2018-01-24 22:20:00', 'First Quarter'))\n",
        "  cursor.execute(insert_moon_phase, ('🌕', '2018-01-31 13:27:00', 'Full Moon'))\n",
        "\n",
        "  # Query for the data in the table to make sure it's populated.\n",
        "  cursor.execute('SELECT * FROM moon_phases')\n",
        "  for row in cursor.fetchall():\n",
        "    print(row)"
      ],
      "execution_count": null,
      "outputs": [
        {
          "output_type": "stream",
          "text": [
            "(1, '🌕', '2017-12-03 15:47:00', 'Full Moon')\n",
            "(2, '🌗', '2017-12-10 07:51:00', 'Last Quarter')\n",
            "(3, '🌑', '2017-12-18 06:30:00', 'New Moon')\n",
            "(4, '🌓', '2017-12-26 09:20:00', 'First Quarter')\n",
            "(5, '🌕', '2018-01-02 02:24:00', 'Full Moon')\n",
            "(6, '🌗', '2018-01-08 22:25:00', 'Last Quarter')\n",
            "(7, '🌑', '2018-01-17 02:17:00', 'New Moon')\n",
            "(8, '🌓', '2018-01-24 22:20:00', 'First Quarter')\n",
            "(9, '🌕', '2018-01-31 13:27:00', 'Full Moon')\n"
          ],
          "name": "stdout"
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "8y-bRhPVWai6"
      },
      "source": [
        "We could use a `FlatMap` transform to receive a SQL query and `yield` each result row, but that would mean creating a new database connection for each query. If we generated a large number of queries, creating that many connections could be a bottleneck.\n",
        "\n",
        "It would be nice to create the database connection only once for each worker, and every query could use the same connection if needed.\n",
        "\n",
        "We can use a\n",
        "[custom `DoFn` transform](https://beam.apache.org/documentation/transforms/python/elementwise/pardo/#example-3-pardo-with-dofn-methods)\n",
        "for this. It allows us to open and close resources, like the database connection, only _once_ per `DoFn` _instance_ by using the `setup` and `teardown` methods.\n",
        "\n",
        "> ℹ️ It should be safe to _read_ from a database with multiple concurrent processes using the same connection, but only one process should be _writing_ at once."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "Bnpwqr-NV5DF",
        "outputId": "5f69a99c-c711-47cf-f13a-c780de57f3e6"
      },
      "source": [
        "import apache_beam as beam\n",
        "from apache_beam.options.pipeline_options import PipelineOptions\n",
        "import sqlite3\n",
        "from typing import Iterable, List, Tuple\n",
        "\n",
        "class SQLiteSelect(beam.DoFn):\n",
        "  def __init__(self, database_file: str):\n",
        "    self.database_file = database_file\n",
        "    self.connection = None\n",
        "\n",
        "  def setup(self):\n",
        "    self.connection = sqlite3.connect(self.database_file)\n",
        "\n",
        "  def process(self, query: Tuple[str, List[str]]) -> Iterable[Dict[str, str]]:\n",
        "    table, columns = query\n",
        "    cursor = self.connection.cursor()\n",
        "    cursor.execute(f\"SELECT {','.join(columns)} FROM {table}\")\n",
        "    for row in cursor.fetchall():\n",
        "      yield dict(zip(columns, row))\n",
        "\n",
        "  def teardown(self):\n",
        "    self.connection.close()\n",
        "\n",
        "@beam.ptransform_fn\n",
        "@beam.typehints.with_input_types(beam.pvalue.PBegin)\n",
        "@beam.typehints.with_output_types(Dict[str, str])\n",
        "def SelectFromSQLite(\n",
        "    pbegin: beam.pvalue.PBegin,\n",
        "    database_file: str,\n",
        "    queries: List[Tuple[str, List[str]]],\n",
        ") -> beam.PCollection[Dict[str, str]]:\n",
        "  return (\n",
        "      pbegin\n",
        "      | 'Create None' >> beam.Create(queries)\n",
        "      | 'SQLite SELECT' >> beam.ParDo(SQLiteSelect(database_file))\n",
        "  )\n",
        "\n",
        "queries = [\n",
        "    # (table_name, [column1, column2, ...])\n",
        "    ('moon_phases', ['phase_emoji', 'peak_datetime', 'phase']),\n",
        "    ('moon_phases', ['phase_emoji', 'phase']),\n",
        "]\n",
        "\n",
        "options = PipelineOptions(flags=[], type_check_additional='all')\n",
        "with beam.Pipeline(options=options) as pipeline:\n",
        "  (\n",
        "      pipeline\n",
        "      | 'Read from SQLite' >> SelectFromSQLite(database_file, queries)\n",
        "      | 'Print rows' >> beam.Map(print)\n",
        "  )"
      ],
      "execution_count": null,
      "outputs": [
        {
          "output_type": "stream",
          "text": [
            "{'phase_emoji': '🌕', 'peak_datetime': '2017-12-03 15:47:00', 'phase': 'Full Moon'}\n",
            "{'phase_emoji': '🌗', 'peak_datetime': '2017-12-10 07:51:00', 'phase': 'Last Quarter'}\n",
            "{'phase_emoji': '🌑', 'peak_datetime': '2017-12-18 06:30:00', 'phase': 'New Moon'}\n",
            "{'phase_emoji': '🌓', 'peak_datetime': '2017-12-26 09:20:00', 'phase': 'First Quarter'}\n",
            "{'phase_emoji': '🌕', 'peak_datetime': '2018-01-02 02:24:00', 'phase': 'Full Moon'}\n",
            "{'phase_emoji': '🌗', 'peak_datetime': '2018-01-08 22:25:00', 'phase': 'Last Quarter'}\n",
            "{'phase_emoji': '🌑', 'peak_datetime': '2018-01-17 02:17:00', 'phase': 'New Moon'}\n",
            "{'phase_emoji': '🌓', 'peak_datetime': '2018-01-24 22:20:00', 'phase': 'First Quarter'}\n",
            "{'phase_emoji': '🌕', 'peak_datetime': '2018-01-31 13:27:00', 'phase': 'Full Moon'}\n",
            "{'phase_emoji': '🌕', 'phase': 'Full Moon'}\n",
            "{'phase_emoji': '🌗', 'phase': 'Last Quarter'}\n",
            "{'phase_emoji': '🌑', 'phase': 'New Moon'}\n",
            "{'phase_emoji': '🌓', 'phase': 'First Quarter'}\n",
            "{'phase_emoji': '🌕', 'phase': 'Full Moon'}\n",
            "{'phase_emoji': '🌗', 'phase': 'Last Quarter'}\n",
            "{'phase_emoji': '🌑', 'phase': 'New Moon'}\n",
            "{'phase_emoji': '🌓', 'phase': 'First Quarter'}\n",
            "{'phase_emoji': '🌕', 'phase': 'Full Moon'}\n"
          ],
          "name": "stdout"
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "C5Mx_pfNpu_q"
      },
      "source": [
        "# Writing data\n",
        "\n",
        "Your might want to write your data in various output formats. Take a look at the\n",
        "[Built-in I/O Transforms](https://beam.apache.org/documentation/io/built-in)\n",
        "page for a list of all the available I/O transforms in Beam.\n",
        "\n",
        "If none of those work for you, you might need to create your own output transform.\n",
        "\n",
        "> ℹ️ For a more in-depth guide, take a look at the\n",
        "[Developing a new I/O connector](https://beam.apache.org/documentation/io/developing-io-overview) page."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "gnoz_mWtxSjW"
      },
      "source": [
        "# What's next?\n",
        "\n",
        "* ![Open in Colab](https://github.com/googlecolab/open_in_colab/raw/main/images/icon16.png)\n",
        "  [Windowing](https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/tour-of-beam/windowing.ipynb) --\n",
        "  how process data based on time intervals. \n",
        "* [Programming guide](https://beam.apache.org/documentation/programming-guide) -- learn about all the Apache Beam concepts in more depth.\n",
        "* [Transform catalog](https://beam.apache.org/documentation/transforms/python/overview) -- check out all the available transforms.\n",
        "* [Mobile gaming example](https://beam.apache.org/get-started/mobile-gaming-example) -- learn more about windowing, triggers, and streaming through a complete example pipeline.\n",
        "* [Runners](https://beam.apache.org/documentation/runners/capability-matrix) -- check the available runners, their capabilities, and how to run your pipeline in them."
      ]
    }
  ]
}